AWS CDK でStep FunctionsのAPIを作成してみた。
概要
この記事では、AWS CDKで同期エクスプレスステートマシンを使用して API Gateway REST API を作成しました。ここでは、StepFunctionsを呼び出すためのHTTP「ANY」メソッドAPIを実装しました。StepFunctionsRestApiコンストラクトを使用して、ステートマシンをAPI Gatewayに接続しました。HTTPS リクエストが API メソッドに送信されると、API Gateway はStep Functions API アクションを呼び出します。
やってみた
CDKアプリの作成
- 次のコマンドを使用してCDKをインストールしておきます。
python -m pip install aws-cdk-lib
- 新しいディレクトリを作成しておきます。
- CDKは、プロジェクトディレクトリの名前に基づいてソースファイルとクラスに名前を付けます。
#create new directory mkdir api-stepfunctions cd api-stepfunctions
- cdk initコマンドを使用してアプリを初期化しておきます。
cdk init --language python
- 次の2つのコマンドを実行して、アプリのPython virtual environmentをアクティブ化し、CDK core dependenciesをインストールしておきます。
source .venv/bin/activate pip install -r requirements.txt
サービスの作成
- 新しいファイル [api_stepfunctions/services.py] を作成して、作成する必要のあるAWSサービスを定義しておきます。
ステートマシンを定義する
- パス状態でMachine Definition を定義しておきます。
invoke_state = stepfunctions.Pass(self,"InvokeState", result = stepfunctions.Result("API Gateway Invokes Step Functions")) success_state = stepfunctions.Pass(self,"SuccessState", result = stepfunctions.Result("Successfully invoked")) machine_definition = invoke_state.next(success_state)
- Machine Definition とEXPRESSタイプでステートマシンを定義しておきます。
- StepFunctionsRestApiは同期Expressステートマシンのみを許可するため、ステートマシンタイプはEXPRESSとして設定されます。
state_machine = stepfunctions.StateMachine(self, 'StateMachine', definition = machine_definition, state_machine_name = "ApiStepFunctions", state_machine_type = stepfunctions.StateMachineType.EXPRESS)
REST APIを定義する
- StepFunctionsRestApiコンストラクトを使用してAPIGateway RESTAPIを作成します。
api = apigateway.StepFunctionsRestApi(self, "StepFunctionsApi", deploy=True, state_machine=state_machine)
完全なコード
- すべてのサービスが定義された完全なコードです。
import aws_cdk as cdk from constructs import Construct from aws_cdk import ( aws_stepfunctions as stepfunctions, aws_apigateway as apigateway ) class serviceStack(Construct): def __init__(self, scope: Construct, id: str): super().__init__(scope, id) invoke_state = stepfunctions.Pass(self,"InvokeState", result = stepfunctions.Result("API Gateway Invokes Step Functions")) success_state = stepfunctions.Pass(self,"SuccessState", result = stepfunctions.Result("Successfull invoked")) machine_definition = invoke_state.next(success_state) state_machine = stepfunctions.StateMachine(self, 'StateMachine', definition = machine_definition, state_machine_name = "ApiStepFunctions", state_machine_type = stepfunctions.StateMachineType.EXPRESS) api = apigateway.StepFunctionsRestApi(self, "StepFunctionsApi", deploy=True, state_machine=state_machine)
アプリにサービスを追加する
- /api_stepfunctions/api_stepfunctions_stack.py ファイルに次のコードを追加しておきます。
#Import the Service file created in the previou step from . import services #service_file.constructor services.serviceStack(self, "api-invoke-step-functions")
CDK Deploy
- Deploy する前に、環境をブートストラップする必要があります。
- 次のコマンドを実行して、AWS環境をブートストラップしておきます。
cdk bootstrap
- 次のコマンドを使用してCDKを展開しておきます。
cdk deploy
- コンソールでは、サービスが作成されたことを見ることができます。
State Machine
State Machine Definition
API Gateway
APIをテストする
- API Gatewayのコンソールで、APIを選択して、Resources ペインで、テストするメソッドを選択しておきます。
- Method Execution ペインで、Testを選択しておきます。
- POSTメソッドを選択して、Testをクリックしておきます。
- Step Functionsの出力を見ることができます。
まとめ
AWS CDK でStep FunctionsのAPIを作成してみました。
Reference :